package com.sweetdream.dataset.webloganalysis

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

/**
 * Title: 网页日志分析
 * Description: 
 * Date 2020/12/16
 */
object WebLogAnalysis {
  def main(args: Array[String]) {
    // 1. env
    val env = ExecutionEnvironment.getExecutionEnvironment

    // make parameters available in the web interface
    val params: ParameterTool = ParameterTool.fromArgs(args)
    env.getConfig.setGlobalJobParameters(params)

    // 2.source
    val documents = getDocumentsDataSet(env, params)
    val ranks = getRanksDataSet(env, params)
    val visits = getVisitsDataSet(env, params)

    // 3.transformation
    val filteredDocs = documents
      .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations "))

    val filteredRanks = ranks
      .filter(rank => rank._1 > 40)

    val filteredVisits = visits
      .filter(visit => visit._2.substring(0, 4).toInt == 2007)

    val joinDocsRanks = filteredDocs
      .join(filteredRanks)
      .where(0)
      .equalTo(1)((doc, rank) => rank)
      .withForwardedFieldsSecond("*")

    val result = joinDocsRanks
      .coGroup(filteredVisits)
      .where(1)
      .equalTo(0)((
                    ranks: Iterator[(Int, String, Int)],
                    visits: Iterator[(String, String)],
                    out: Collector[(Int, String, Int)]) =>
        if (visits.isEmpty) for (rank <- ranks) out.collect(rank))
      .withForwardedFieldsFirst("*")

    // 4.sink
    if (params.has("output")) {
      result.writeAsCsv(params.get("output"), "\n", "|")
      env.execute("Scala WebLogAnalysis Example")
    } else {
      println("Printing result to stdout. Use --output to specify output path.")
      result.print()
    }
  }

  private def getDocumentsDataSet(env: ExecutionEnvironment, params: ParameterTool): DataSet[(String, String)] = {
    if (params.has("documents")) {
      env.readCsvFile[(String, String)](
        params.get("documents"),
        fieldDelimiter = "|",
        includedFields = Array(0, 1))
    } else {
      println("Executing WebLogAnalysis example with default documents data set.")
      println("Use --documents to specify file input.")
      val documents = WebLogData.DOCUMENTS map {
        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
      }
      env.fromCollection(documents)
    }
  }

  private def getRanksDataSet(env: ExecutionEnvironment, params: ParameterTool): DataSet[(Int, String, Int)] = {
    if (params.has("ranks")) {
      env.readCsvFile[(Int, String, Int)](
        params.get("ranks"),
        fieldDelimiter = "|",
        includedFields = Array(0, 1, 2))
    } else {
      println("Executing WebLogAnalysis example with default ranks data set.")
      println("Use --ranks to specify file input.")
      val ranks = WebLogData.RANKS map {
        case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int])
      }
      env.fromCollection(ranks)
    }
  }

  private def getVisitsDataSet(env: ExecutionEnvironment, params: ParameterTool): DataSet[(String, String)] = {
    if (params.has("visits")) {
      env.readCsvFile[(String, String)](
        params.get("visits"),
        fieldDelimiter = "|",
        includedFields = Array(1, 2))
    } else {
      println("Executing WebLogAnalysis example with default visits data set.")
      println("Use --visits to specify file input.")
      val visits = WebLogData.VISITS map {
        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
      }
      env.fromCollection(visits)
    }
  }
}
